package com.jie.flink.cdc.service;

import com.jie.flink.cdc.config.SpringContextUtil;
import com.jie.flink.cdc.datafilter.DatabaseReadDataFilter;
import com.jie.flink.cdc.doman.PgReplicationSlot;
import com.jie.flink.cdc.flinksource.PostgreSQLDeserialization;
import com.jie.flink.cdc.flinksource.config.PostgreSQLSourceConfigProperties;
import com.jie.flink.cdc.flinksource.config.SourceConfigWashMapper;
import com.jie.flink.cdc.mapper.PgReplicationSlotMapper;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;


/**
 * @author zhanggj
 * @data 2023/1/31
 * 监听数据变更
 */
@Slf4j
public class PostgreSQLFlinkCdcService extends FlinkCdcService {

    private PostgreSQLSourceConfigProperties dataChangeSourceConfig;

    /**
     * flink槽名前缀
     */
    private static final String SLOT_NAME_PREFIX = "flink_slot";

    /**
     * 复制槽活跃标识
     */
    private static final String SLOT_ACTIVE_FALSE = "f";

    public PostgreSQLFlinkCdcService(final PostgreSQLSourceConfigProperties dataChangeSourceConfig) {
        this.dataChangeSourceConfig = dataChangeSourceConfig;
    }

    @Override
    protected DataStreamSource<String> buildFlinkStreamSource(final StreamExecutionEnvironment env) {
        return env.addSource(buildPostgreSQLDataSource(), "PostgreSQL-source")
                .setParallelism(1);
//        dataStream.process(new SinkMapFunction("FLINK_SINK", "*"))
//                .addSink(
//                        new RocketMQSink(getProducerProps())
        // 不支持批量
//                                .withBatchFlushOnCheckpoint(true)
//                                .withBatchSize(32)
//                                .withAsync(true))
//                .setParallelism(2);

    }



    public DebeziumSourceFunction<String> buildPostgreSQLDataSource() {
        Properties properties = new Properties();
        // 指定连接器启动时执行快照的条件：****重要*****
        //initial- 连接器仅在没有逻辑服务器记录偏移量(offset)时才执行快照。
        //always- 连接器每次启动时都会执行快照。
        //never- 连接器从不执行快照。
        //initial_only- 连接器执行初始快照然后停止，不处理任何后续更改。
        //exported- 连接器根据创建复制槽的时间点执行快照。这是一种以无锁方式执行快照的绝佳方式。
        //custom- 连接器根据snapshot.custom.class属性的设置执行快照
        properties.setProperty("debezium.snapshot.mode", "initial");
        properties.setProperty("snapshot.mode", "initial");
        // 1个记录槽记录了一个客户端的采集信息，因此应该使用一个固定的槽名slotName，服务不可用有恢复后可以根据checkPoint继续数据采集
        String slotName = SLOT_NAME_PREFIX;
        if (dataChangeSourceConfig.getCheckPointRenew()) {
            slotName = randomSlotName();
        }
        properties.setProperty("debezium.slot.name", slotName);
        //
        properties.setProperty("slot.name", slotName);
        properties.setProperty("debezium.slot.drop.on.top", "true");
        properties.setProperty("slot.drop.on.top", "true");
        // 更多参数配置参考debezium官网 https://debezium.io/documentation/reference/1.2/connectors/postgresql.html?spm=a2c4g.11186623.0.0.4d485fb3rgWieD#postgresql-property-snapshot-mode
        // 或阿里文档 https://help.aliyun.com/document_detail/184861.html

        PostgreSQLDeserialization deserialization = new PostgreSQLDeserialization(new DatabaseReadDataFilter());

        if (dataChangeSourceConfig.getInitReadIgnore()) {
            properties.setProperty("debezium.snapshot.mode", "never");
            properties.setProperty("snapshot.mode", "never");
        }

        return PostgreSQLSource.<String>builder()
                .hostname(dataChangeSourceConfig.getHostName())
                .port(dataChangeSourceConfig.getPort())
                .username(dataChangeSourceConfig.getUserName())
                .password(dataChangeSourceConfig.getPassword())
                .database(dataChangeSourceConfig.getDatabase())
                .schemaList(dataChangeSourceConfig.getSchemaArray())
                .tableList(SourceConfigWashMapper.tableNameWash(dataChangeSourceConfig.getTableArray()))
                .decodingPluginName("pgoutput")
                .deserializer(deserialization)
                .debeziumProperties(properties)
                .build();
    }

    /**
     * 随机复制槽名
     * 如果需要每次服务启动后丢弃之前checkPoint
     * 可以使用随机复制槽名称，来为每次启动单独复制槽
     * @return 唯一的复制槽名
     */
    private String randomSlotName() {
        final PgReplicationSlotMapper pgReplicationSlotMapper = SpringContextUtil.getBean(PgReplicationSlotMapper.class);

        // 查询复制槽
        List<PgReplicationSlot> slotList = pgReplicationSlotMapper.selectAll();
        while(CollectionUtils.isNotEmpty(slotList)) {
            String activeSlot = null;
            for (PgReplicationSlot slot : slotList) {
                if (StringUtils.startsWith(slot.getSlotName(), SLOT_NAME_PREFIX)) {
                    if (SLOT_ACTIVE_FALSE.equals(slot.getActive())) {
                        // 删除旧的非活跃复制槽
                        pgReplicationSlotMapper.deleteSlot(slot.getSlotName());
                    } else {
                        activeSlot = slot.getSlotName();
                        break;
                    }
                }
            }
            if (StringUtils.isBlank(activeSlot)) {
                return SLOT_NAME_PREFIX.concat(getUUID());
            }
            log.info("有活跃复制曹{}， 休眠10s", activeSlot);
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                log.error("复制槽创建过程休眠异常：", e);
            }
            slotList = pgReplicationSlotMapper.selectAll();
        }

        return SLOT_NAME_PREFIX.concat(getUUID());
    }

    private String getUUID() {
        return StringUtils.replace(UUID.randomUUID().toString(), "-", "").toLowerCase();
    }

}
